feat(vector): Support writing VECTOR to parquet and avro formats using Spark#18328
feat(vector): Support writing VECTOR to parquet and avro formats using Spark#18328rahil-c wants to merge 13 commits intoapache:masterfrom
Conversation
79398b2 to
8adeccb
Compare
|
@rahil-c to update pr overview |
…tion test - Write path (HoodieRowParquetWriteSupport.makeWriter) now switches on VectorElementType (FLOAT/DOUBLE/INT8) instead of hardcoding float, matching the read paths - Remove redundant detectVectorColumns call in readBaseFile by reusing vectorCols from requiredSchema for requestedSchema - Add testColumnProjectionWithVector covering 3 scenarios: exclude vector, vector-only, and all columns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Use VectorLogicalType.VECTOR_BYTE_ORDER instead of hardcoded ByteOrder.LITTLE_ENDIAN in all 4 locations (write support, reader, Scala reader context, file group format) - Add Math.multiplyExact overflow guard for dimension * elementSize in HoodieRowParquetWriteSupport - Remove unnecessary array clone in HoodieSparkParquetReader - Add clarifying comment on non-vector column else branch - Fix misleading "float arrays" comment to "typed arrays" - Move inline JavaConverters import to top-level in SparkFileFormatInternalRowReaderContext - Import Metadata at top level instead of fully-qualified reference - Consolidate duplicate detectVectorColumns, replaceVectorColumnsWithBinary, and convertBinaryToVectorArray into SparkFileFormatInternalRowReaderContext companion object; HoodieFileGroupReaderBasedFileFormat now delegates - Add Javadoc on VectorType explaining it's needed for InternalSchema type hierarchy (cannot reuse HoodieSchema.Vector) - Clean up unused imports (ByteOrder, ByteBuffer, GenericArrayData, StructField, BinaryType, HoodieSchemaType) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e types New tests added to TestVectorDataSource: - testDoubleVectorRoundTrip: DOUBLE element type end-to-end (64-dim) - testInt8VectorRoundTrip: INT8/byte element type end-to-end (256-dim) - testMultipleVectorColumns: two vector columns (float + double) in same schema with selective nulls and per-column projection - testMorTableWithVectors: MOR table type with insert + upsert, verifying merged view returns correct vectors - testCowUpsertWithVectors: COW upsert (update existing + insert new) verifying vector values after merge - testLargeDimensionVector: 1536-dim float vectors (OpenAI embedding size) to exercise large buffer allocation - testSmallDimensionVector: 2-dim vectors with edge values (Float.MaxValue) to verify boundary handling - testVectorWithNonVectorArrayColumn: vector column alongside a regular ArrayType(StringType) to ensure non-vector arrays are not incorrectly treated as vectors - testMorWithMultipleUpserts: MOR with 3 successive upsert batches of DOUBLE vectors, verifying the latest value wins per key Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ix hot-path allocation - Create shared VectorConversionUtils utility class to eliminate duplicated vector conversion logic across HoodieSparkParquetReader, SparkFileFormatInternalRowReaderContext, and HoodieFileGroupReaderBasedFileFormat - Add explicit dimension validation in HoodieRowParquetWriteSupport to prevent silent data corruption when array length doesn't match declared vector dimension - Reuse GenericInternalRow in HoodieSparkParquetReader's vector post-processing loop to reduce GC pressure on large scans
…eSchema.Vector] to fix Scala 2.12 type inference error
52f6db8 to
959bcd8
Compare
3f7e2d0 to
f8ce228
Compare
- Move VectorConversionUtils import into hudi group (was misplaced in 3rdParty) - Add blank line between hudi and 3rdParty import groups - Add blank line between java and scala import groups Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@yihua @voonhous @balaji-varadarajan-ai will need a review from one of you guys if possible |
balaji-varadarajan-ai
left a comment
There was a problem hiding this comment.
Still reviewing the PR. here are the initial comments
| StructType structSchema = HoodieInternalRowUtils.getCachedSchema(nonNullSchema); | ||
|
|
||
| // Detect vector columns: ordinal → Vector schema | ||
| Map<Integer, HoodieSchema.Vector> vectorColumnInfo = VectorConversionUtils.detectVectorColumns(nonNullSchema); |
There was a problem hiding this comment.
seeing the pattern:
- Detecting vector columns.
- Replacing Schema
- Post-process rows
in HoodieSparkParquetReader, SparkFileFormatInternalRowReaderContext and HoodieFileGroupReaderBasedFileFormat. Wondering if you can bring them under one common method with specific callback.
There was a problem hiding this comment.
can look into this
| * @param schema a HoodieSchema of type RECORD (or null) | ||
| * @return map from field index to Vector schema; empty map if schema is null or has no vectors | ||
| */ | ||
| public static Map<Integer, HoodieSchema.Vector> detectVectorColumns(HoodieSchema schema) { |
There was a problem hiding this comment.
Just checking, As we are using integer ordinal position in the schema, can you check if things end to end with projections and schema evolution?
There was a problem hiding this comment.
I believe i have tests for this in the pr but will check
| HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) resolvedSchema; | ||
| int fixedSize = vectorSchema.getDimension() | ||
| * vectorSchema.getVectorElementType().getElementSize(); | ||
| return Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) |
There was a problem hiding this comment.
The vectors are stored as bare FIXED_LEN_BYTE_ARRAY in Parquet with no logical type annotation or key-value metadata on the Parquet column. I think it would be useful to track this. Any thoughts?
There was a problem hiding this comment.
@balaji-varadarajan-ai so you mean we want to keep track of the hudi type info around VECTOR within parquet itself? If so i think i can look into this.
Describe the issue this Pull Request addresses
Builds on #18146 (VECTOR type in HoodieSchema) and #18190 (Spark↔HoodieSchema converters) to complete the full read/write pipeline for vector columns in Apache Hudi backed by Parquet.
Vectors are stored as Parquet
FIXED_LEN_BYTE_ARRAY(little-endian, IEEE-754) rather than repeated groups.Summary and Changelog
Write path
HoodieRowParquetWriteSupport: detectsArrayTypecolumns annotated withhudi_type=VECTOR(dim, elementType)metadata and serialises them asFIXED_LEN_BYTE_ARRAYinstead of a Parquet list. Dimension mismatch at write time throwsHoodieExceptionto prevent silent data corruption.FLOAT32,FLOAT64,INT8Read path
HoodieSparkParquetReaderandSparkFileFormatInternalRowReaderContext: detectFIXED_LEN_BYTE_ARRAYcolumns carryinghudi_typemetadata and deserialise them back to SparkArrayData.HoodieFileGroupReaderBasedFileFormat: propagates vector column metadata through the file-group reader so schema is not lost during Spark's internal schema resolution.VectorConversionUtils(new): shared utility extracted to eliminate duplicated byte-buffer decode logic across the two reader paths.Schema / compatibility
InternalSchemaConverter: mapsVectorTypeto/from Avrobyteswithhudi_typeprop, preserving dimension and element-type metadata through the Avro layer.HoodieSchemaCompatibilityChecker: rejects illegal vector evolution (e.g. dimension change) rather than silently coercing.HoodieSchemaComparatorForSchemaEvolution: treats vector columns as incompatible when dimension or element type differs.HoodieTableMetadataUtil: skips column statistics for vector columns (min/max on raw bytes is meaningless).AvroSchemaConverterWithTimestampNTZ: passes throughhudi_typeproperty on bytes fields so vector metadata survives Avro↔Spark schema round-trips.Types.VectorType: addsbyteSize()helper used by the write path to computeFIXED_LEN_BYTE_ARRAYlength.Tests
TestVectorDataSource(808 lines): end-to-end Spark functional tests covering FLOAT32, FLOAT64, INT8 across COPY_ON_WRITE and MERGE_ON_READ table types; includes column projection, schema evolution rejection, and multi-batch upsert round-trips.TestHoodieSchemaCompatibility,TestHoodieSchemaComparatorForSchemaEvolution,TestHoodieTableMetadataUtil: unit tests for schema-layer changes.Impact
FIXED_LEN_BYTE_ARRAY. Reading those files with an older Hudi version will surface raw bytes rather than a float array; users should upgrade readers alongside writers.Risk Level
Low. All changes are gated behind
hudi_type=VECTOR(...)metadata presence. Tables that do not use vector columns are unaffected. New paths are covered by functional tests across both table types.Documentation Update
A follow-up website doc page covering vector column usage (schema annotation, supported element types, Parquet layout) will be raised separately. Config changes: none.
Contributor's checklist